Feature/remove large in clause in assets with cte and join#62114
Feature/remove large in clause in assets with cte and join#62114Nataneljpwd wants to merge 18 commits intoapache:mainfrom
Conversation
e2000b8 to
9110b93
Compare
…/remove-large-in-clause-in-assets
…/remove-large-in-clause-in-assets
2192078 to
ebc08dd
Compare
Asquator
left a comment
There was a problem hiding this comment.
Nice improvement, in clauses should always be avoided on values residing in the DB if possible.
| select(AssetModel) | ||
| .outerjoin(DagScheduleAssetReference) | ||
| .outerjoin(TaskOutletAssetReference) | ||
| .outerjoin(TaskInletAssetReference) | ||
| .group_by(AssetModel.id) | ||
| .order_by(orphaned) |
There was a problem hiding this comment.
Did you consider extracting this to a helper function in asset.py, like many others located there?
There was a problem hiding this comment.
I do not see a benefit from doing it, and so I did not do it, do you have a reason for the request? as I might have missed something
| active_assets_query = select(AssetActive.name, AssetActive.uri).join( | ||
| assets_query, | ||
| and_(AssetActive.name == assets_query.c.name, AssetActive.uri == assets_query.c.uri), |
There was a problem hiding this comment.
Can this be a helper function in asset.py too?
Just to avoid adding even more logic into the scheduler.
| and_(AssetActive.name == assets_query.c.name, AssetActive.uri == assets_query.c.uri), | ||
| ) | ||
|
|
||
| active_assets = session.execute(active_assets_query).all() |
There was a problem hiding this comment.
If there are users with thousands of active assets, I wonder if this may explode one day.
There was a problem hiding this comment.
it is a good point, maybe it is out of scope of the given PR, I might open a new PR for this after to handle large scale, as if, batching, yet as of now it is not an issue, and so for now I will leave it as is
| session.execute( | ||
| delete(AssetActive).where( | ||
| tuple_(AssetActive.name, AssetActive.uri).in_((a.name, a.uri) for a in assets) | ||
| def _orphan_unreferenced_assets(assets_query: CTE, *, session: Session) -> None: |
There was a problem hiding this comment.
Maybe we can avoid passing a CTE as an argument (which is not intuitive) by using the helper function.
There was a problem hiding this comment.
what do you suggest then?
it had the least amount of duplicated code, if there are any suggestions, I would be happy to hear
There was a problem hiding this comment.
asset_reference_query is a static query that never changes. If it's referenced in two places, maybe it's worth extracting it as a helper function, again?
There was a problem hiding this comment.
This way we won't be passing CTEs as method parameters
There was a problem hiding this comment.
it is harder to track that way in my opinion
way simpler to just see a query passed rather than go to a different method
There was a problem hiding this comment.
I don't think it's harder to track. As it's a constant, reusable CTE, I would put it as a cached util function in the corresponding module instead of generating it in the scheduler code.
…/remove-large-in-clause-in-assets
d8e235d to
ff0347f
Compare
| assets = select(AssetModel).where(assets_select_condition).cte() | ||
|
|
||
| if not AIRFLOW_V_3_2_PLUS: | ||
| assets = self.session.scalars(select(assets)).all() |
There was a problem hiding this comment.
For Airflow <3.2 this fallback currently uses scalars(select(assets)) where assets is a CTE built from select(AssetModel). scalars() returns only the first selected column, so this becomes a list of IDs (not AssetModel objects). That can break _activate_referenced_assets when it expects .name / .uri. Could we keep the old pre-3.2 materialization query, or join the CTE back to AssetModel before calling scalars()?
There was a problem hiding this comment.
Sure, I will join back to the asset model
|
|
||
| asset_models = session.scalars(select(AssetModel)).all() | ||
| assert len(asset_models) == 3 | ||
| asset_models = select(AssetModel).cte() |
There was a problem hiding this comment.
Would you add an explicit regression assertion for #61453's failure mode (large tuple-IN bind expansion)? These tests now validate behavior with a CTE input, but they don't directly guard against reintroducing a huge (name, uri) IN (...) path in scheduler asset activation.
There was a problem hiding this comment.
How do you think this can be added? As it does not cause failure when using in, rather just cause some slowdown
The only think I can think of is to check for the keyword 'in' for the str of the query
There was a problem hiding this comment.
found a way to make it work with event listeners in sqlalchemy, added the test
…/remove-large-in-clause-in-assets
|
Hello @kaxil, I have fixed the comments, I would appreciate a review |
Closes: #61453
This issue solves the large in clause using a cte with a join rather than batching
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.